package com.dahuan.sink;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

public class Kafka_Sink {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );
        //创建kafka配置信息
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "localhost:9092");
        prop.setProperty("group.id", "consumer-group");
        prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset", "latest");

        //从kafka当中传输数据
        DataStreamSource<String> inputStream = env.addSource( new FlinkKafkaConsumer011<String>( "sensor", new SimpleStringSchema(), prop ) );

        //进行系列转换
        DataStream<String> dataStream = inputStream.map( data ->{
            String[] split = data.split( "," );
            return new SensorReading( split[0],new Long( split[1] ),new Double( split[2] ) )
                    .toString();
        } );

        //传输到kafka另一端
        dataStream.addSink( new FlinkKafkaProducer011<String>( "localhost:9092", "sinktest", new SimpleStringSchema()));

        env.execute("KafkaSink");
    }
}
